/*------------------------------------------------------------------------*/
/*  Copyright 2014 Sandia Corporation.                                    */
/*  This software is released under the license detailed                  */
/*  in the file, LICENSE, which is located in the top-level Nalu          */
/*  directory structure                                                   */
/*------------------------------------------------------------------------*/


#include <ActuatorLinePointDrag.h>
#include <FieldTypeDef.h>
#include <NaluParsing.h>
#include <NaluEnv.h>
#include <Realm.h>
#include <Simulation.h>

// master elements
#include <master_element/MasterElement.h>

// stk_mesh/base/fem
#include <stk_mesh/base/BulkData.hpp>
#include <stk_mesh/base/Entity.hpp>
#include <stk_mesh/base/Field.hpp>
#include <stk_mesh/base/FieldParallel.hpp>
#include <stk_mesh/base/GetBuckets.hpp>
#include <stk_mesh/base/Selector.hpp>
#include <stk_mesh/base/MetaData.hpp>
#include <stk_mesh/base/Part.hpp>

// stk_util
#include <stk_util/parallel/ParallelReduce.hpp>

// stk_search
#include <stk_search/CoarseSearch.hpp>
#include <stk_search/IdentProc.hpp>

// basic c++
#include <vector>
#include <map>
#include <string>
#include <stdexcept>
#include <cmath>

namespace sierra{
namespace nalu{

//==========================================================================
// Class Definition
//==========================================================================
// ActuatorLinePointDragInfo - holds all points in the tower specification
//==========================================================================
//--------------------------------------------------------------------------
//-------- constructor -----------------------------------------------------
//--------------------------------------------------------------------------
ActuatorLinePointDragInfo::ActuatorLinePointDragInfo()
  : processorId_(0),
    numPoints_(1),
    turbineName_("machine_one"),
    radius_(0),
    omega_(0.0),
    gaussDecayRadius_(1.5)
{
  // nothing to do
}

//--------------------------------------------------------------------------
//-------- destructor ------------------------------------------------------
//--------------------------------------------------------------------------
ActuatorLinePointDragInfo::~ActuatorLinePointDragInfo()
{
  // nothing to do
}

//==========================================================================
// Class Definition
//==========================================================================
// ActuatorLinePointDragPointInfo - holds individual points information
//==========================================================================
//--------------------------------------------------------------------------
//-------- constructor -----------------------------------------------------
//--------------------------------------------------------------------------
ActuatorLinePointDragPointInfo::ActuatorLinePointDragPointInfo(
  size_t localId,
  Point centroidCoords,
  double radius,
  double omega,
  double gaussDecayRadius,
  double *velocity)
  : localId_(localId),
    centroidCoords_(centroidCoords),
    radius_(radius),
    omega_(omega),
    gaussDecayRadius_(gaussDecayRadius),
    bestX_(1.0e16),
    bestElem_(stk::mesh::Entity())
{
  // initialize point velocity and displacement
  velocity_[0] = velocity[0];
  velocity_[1] = velocity[1];
  velocity_[2] = velocity[2];
}
//--------------------------------------------------------------------------
//-------- destructor ------------------------------------------------------
//--------------------------------------------------------------------------
ActuatorLinePointDragPointInfo::~ActuatorLinePointDragPointInfo()
{
  // nothing to do
}

//==========================================================================
// Class Definition
//==========================================================================
// ActuatorLinePointDrag - assemble source term for subgrin turbine; WIP
//==========================================================================
//--------------------------------------------------------------------------
//-------- constructor -----------------------------------------------------
//--------------------------------------------------------------------------
ActuatorLinePointDrag::ActuatorLinePointDrag(
  Realm &realm,
  const YAML::Node &node)
  : Actuator(realm, node),
    realm_(realm),
    searchMethod_(stk::search::KDTREE),
    actuatorLineGhosting_(NULL),
    needToGhostCount_(0),
    localPointId_(0),
    actuatorLineMotion_(false),
    pi_(acos(-1.0))
{
  // load the data
  load(node);

  /*
    current WIP prototype
    Design concepts:
     1) First and foremost, elements are ghosted to the owning point rank. This
        probably should be changed since the number of elements might be larger
        than the number of points. Therefore, ghosting points to elements is probably
        easier. This will remove the parallel sum contributions from ghosted elements.
        time will tell..

     2) There can be many specifications with the number of points and omega processed.

     3) in the end, we fill the map of ActuatorLinePointDragPointInfo objects and iterate this guy
        to assemble source terms

     4) at present, fake source terms on simple Gaussian weighting

    actuator:
      search_method: stk_octree
      search_target_part: block_1

      specifications:

        - name: machine_one
          radius: 2.0
          omega: 1.0
          gaussian_decay_radius: 1.5
          tip_coodinates: [0.0, 0.0, 0.0]
          tail_coodinates: [0.0, 0.0, 0.0]
  */
}

//--------------------------------------------------------------------------
//-------- destructor ------------------------------------------------------
//--------------------------------------------------------------------------
ActuatorLinePointDrag::~ActuatorLinePointDrag()
{

  // delete data probes specifications vector
  for ( size_t k = 0; k < actuatorLineInfo_.size(); ++k )
    delete actuatorLineInfo_[k];
}

//--------------------------------------------------------------------------
//-------- compute_point_drag ----------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::compute_point_drag(
  const int &nDim,
  const double &pointRadius,
  const double *pointVelocity,
  const double *pointGasVelocity,
  const double &pointGasViscosity,
  const double &pointGasDensity,
  double *pointForce,
  double *pointForceLHS)
{
  // compute magnitude of velocity difference between point and gas
  double vRelMag = 0.0;
  for ( int j = 0; j < nDim; ++j )
    vRelMag += (pointVelocity[j] - pointGasVelocity[j] )*(pointVelocity[j] - pointGasVelocity[j]);
  vRelMag = std::sqrt(vRelMag);

  // Reynolds number and friction factors
  double ReP = 2.0*pointGasDensity*pointRadius*vRelMag/pointGasViscosity;
  double CubeRtReP = (ReP < 1000.) ? std::cbrt(ReP) : 0.0;
  double fD = (ReP < 1000.0) ? (1.0 + CubeRtReP*CubeRtReP/6.0) : (0.424/24.0 * ReP);
  double coef = 6.0*pi_*pointGasViscosity*pointRadius;

  // this is from the fluids perspective, not the psuedo particle

  for ( int j = 0; j < nDim; ++j ) {
    pointForce[j] = coef*fD*(pointVelocity[j] - pointGasVelocity[j]);
    pointForceLHS[j] = coef*fD;
  }
}

//--------------------------------------------------------------------------
//-------- compute_node_drag_given_radius ----------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::compute_node_drag_given_radius(
  const int &nDim,
  const double &radius,
  const double &epsilon,
  const double *pointForce,
  double *nodeDrag)
{
  // gaussian weight based on radius
  double gaussWeight;
  const double pi = acos(-1.0);
  if ( nDim == 2 )
      gaussWeight = (1.0 / (epsilon * epsilon * pi)) * exp(-radius * radius / epsilon * epsilon);
  else
      gaussWeight = (1.0 / (epsilon * epsilon * epsilon * pow(pi,1.5))) * exp(-radius * radius / epsilon * epsilon);

  for ( int j = 0; j < nDim; ++j )
    nodeDrag[j] = pointForce[j]*gaussWeight;
}

//--------------------------------------------------------------------------
//-------- load ------------------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::load(
  const YAML::Node & y_node)
{
  // check for any data probes
  const YAML::Node y_actuatorLine = y_node["actuator"];
  if (y_actuatorLine) {
    NaluEnv::self().naluOutputP0() << "ActuatorLinePointDrag::load" << std::endl;

    // search specifications
    std::string searchMethodName = "na";
    get_if_present(y_actuatorLine, "search_method", searchMethodName, searchMethodName);

    // determine search method for this pair
    if ( searchMethodName != "stk_kdtree" )
      NaluEnv::self().naluOutputP0() << "ActuatorLinePointDrag::search_method only supports stk_kdtree" 
                                     << std::endl;
    
    // extract the set of from target names; each spec is homogeneous in this respect
    const YAML::Node searchTargets = y_actuatorLine["search_target_part"];
    if (searchTargets.Type() == YAML::NodeType::Scalar) {
      searchTargetNames_.resize(1);
      searchTargetNames_[0] = searchTargets.as<std::string>() ;
    }
    else {
      searchTargetNames_.resize(searchTargets.size());
      for (size_t i=0; i < searchTargets.size(); ++i) {
        searchTargetNames_[i] = searchTargets[i].as<std::string>() ;
      }
    }

    const YAML::Node y_specs = expect_sequence(y_actuatorLine, "specifications", false);
    if (y_specs) {

      // save off number of towers
      const int numTowers = y_specs.size();

      // deal with processors... Distribute each tower over subsequent procs
      const int numProcs = NaluEnv::self().parallel_size();
      const int divProcTower = std::max(numProcs/numTowers, numProcs);

      // each specification can have multiple machines
      for (size_t ispec = 0; ispec < y_specs.size(); ++ispec) {
        const YAML::Node y_spec = y_specs[ispec];

        ActuatorLinePointDragInfo *actuatorLineInfo = new ActuatorLinePointDragInfo();
        actuatorLineInfo_.push_back(actuatorLineInfo);

        // name
        const YAML::Node theName = y_spec["turbine_name"];
        if ( theName )
          actuatorLineInfo->turbineName_ = theName.as<std::string>() ;
        else
          throw std::runtime_error("ActuatorLinePointDrag: no name provided");

        // processor id; distribute los equally over the number of processors
        actuatorLineInfo->processorId_ = divProcTower > 0 ? ispec % divProcTower : 0;

        // number of points
        get_if_present(y_spec, "number_of_points", actuatorLineInfo->numPoints_, actuatorLineInfo->numPoints_);
      
        // radius and omega
        get_if_present(y_spec, "radius", actuatorLineInfo->radius_, actuatorLineInfo->radius_);
        get_if_present(y_spec, "omega", actuatorLineInfo->omega_, actuatorLineInfo->omega_);
        if ( actuatorLineInfo->omega_ > 0.0 ||  actuatorLineInfo->omega_ < 0.0 )
          actuatorLineMotion_ = true;

        // Gaussian props
        get_if_present(y_spec, "gaussian_decay_radius", actuatorLineInfo->gaussDecayRadius_, actuatorLineInfo->gaussDecayRadius_);

        // tip coordinates of this point
        const YAML::Node tipCoord = y_spec["tip_coordinates"];
        if ( tipCoord )
          actuatorLineInfo->tipCoordinates_ = tipCoord.as<Coordinates>() ;
        else
          throw std::runtime_error("ActuatorLinePointDrag: lacking tip_coordinates");

        // tail coordinates of this point
        const YAML::Node tailCoord = y_spec["tail_coordinates"];
        if ( tailCoord )
          actuatorLineInfo->tailCoordinates_ = tailCoord.as<Coordinates >() ;
        else
          throw std::runtime_error("ActuatorLinePointDrag: lacking tail_coordinates");
      }
    }
  }
}


//--------------------------------------------------------------------------
//-------- setup -----------------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::setup()
{
  // objective: declare the part, register coordinates; must be before populate_mesh()

}

//--------------------------------------------------------------------------
//-------- initialize ------------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::initialize()
{
  stk::mesh::BulkData & bulkData = realm_.bulk_data();

  // initialize need to ghost and elems to ghost
  needToGhostCount_ = 0;
  elemsToGhost_.clear();

  // clear actuatorLinePointInfoMap_
  std::map<size_t, ActuatorLinePointDragPointInfo *>::iterator iterPoint;
  for( iterPoint=actuatorLinePointInfoMap_.begin(); iterPoint!=actuatorLinePointInfoMap_.end(); ++iterPoint )
    delete (*iterPoint).second;
  actuatorLinePointInfoMap_.clear();

  bulkData.modification_begin();

  if ( actuatorLineGhosting_ == NULL) {
    // create new ghosting
    std::string theGhostName = "nalu_actuator_line_ghosting";
    actuatorLineGhosting_ = &bulkData.create_ghosting( theGhostName );
  }
  else {
    bulkData.destroy_ghosting(*actuatorLineGhosting_);
  }

  bulkData.modification_end();

  // clear some of the search info
  boundingSphereVec_.clear();
  boundingElementBoxVec_.clear();
  searchKeyPair_.clear();

  // set all of the candidate elements in the search target names
  populate_candidate_elements();

  // create the ActuatorLinePointDragPointInfo
  create_actuator_line_point_info_map();

  // coarse search
  determine_elems_to_ghost();

  // manage ghosting
  manage_ghosting();

  // complete filling in the set of elements connected to the centroid
  complete_search();
}

//--------------------------------------------------------------------------
//-------- execute ---------------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::execute()
{
  // do we have mesh motion?
  if ( actuatorLineMotion_ )
    initialize();

  // meta/bulk data and nDim
  stk::mesh::MetaData & metaData = realm_.meta_data();
  stk::mesh::BulkData & bulkData = realm_.bulk_data();
  const int nDim = metaData.spatial_dimension();

  // extract fields
  VectorFieldType *coordinates
    = metaData.get_field<VectorFieldType>(stk::topology::NODE_RANK, realm_.get_coordinates_name());
  VectorFieldType *velocity = metaData.get_field<VectorFieldType>(stk::topology::NODE_RANK, "velocity");
  VectorFieldType *actuator_source
    = metaData.get_field<VectorFieldType>(stk::topology::NODE_RANK, "actuator_source");
  VectorFieldType *actuator_source_lhs
    = metaData.get_field<VectorFieldType>(stk::topology::NODE_RANK, "actuator_source_lhs");
  ScalarFieldType *density
    = metaData.get_field<ScalarFieldType>(stk::topology::NODE_RANK, "density");
  // deal with proper viscosity
  const std::string viscName = realm_.is_turbulent() ? "effective_viscosity" : "viscosity";
  ScalarFieldType *viscosity
    = metaData.get_field<ScalarFieldType>(stk::topology::NODE_RANK, viscName);

  // fixed size scratch
  std::vector<double> ws_pointGasVelocity(nDim);
  std::vector<double> ws_elemCentroid(nDim);
  std::vector<double> ws_pointForce(nDim);
  std::vector<double> ws_elemDrag(nDim);
  double ws_pointGasDensity;
  double ws_pointGasViscosity;
  std::vector<double> ws_pointForceLHS(nDim);

  // zero out source term; do this manually since there are custom ghosted entities
  stk::mesh::Selector s_nodes = stk::mesh::selectField(*actuator_source);
  stk::mesh::BucketVector const& node_buckets =
    realm_.get_buckets( stk::topology::NODE_RANK, s_nodes );
  for ( stk::mesh::BucketVector::const_iterator ib = node_buckets.begin() ;
        ib != node_buckets.end() ; ++ib ) {
    stk::mesh::Bucket & b = **ib ;
    const stk::mesh::Bucket::size_type length   = b.size();
    double * actSrc = stk::mesh::field_data(*actuator_source, b);
    double * actSrcLhs = stk::mesh::field_data(*actuator_source_lhs, b);
    for ( stk::mesh::Bucket::size_type k = 0 ; k < length ; ++k ) {
      const int offSet = k*nDim;
      for ( int j = 0; j < nDim; ++j ) {
        actSrc[offSet+j] = 0.0;
        actSrcLhs[offSet+j] = 0.0;
      }
    }
  }

  // parallel communicate data to the ghosted elements; again can communicate points to element ranks
  if ( NULL != actuatorLineGhosting_ ) {
    std::vector< const stk::mesh::FieldBase *> ghostFieldVec;
    // fields that are needed
    ghostFieldVec.push_back(coordinates);
    ghostFieldVec.push_back(velocity);
    ghostFieldVec.push_back(viscosity);
    stk::mesh::communicate_field_data(*actuatorLineGhosting_, ghostFieldVec);
  }

  // loop over map and assemble source terms
  std::map<size_t, ActuatorLinePointDragPointInfo *>::iterator iterPoint;
  for (iterPoint  = actuatorLinePointInfoMap_.begin();
       iterPoint != actuatorLinePointInfoMap_.end();
       ++iterPoint) {

    // actuator line info object of interest
    ActuatorLinePointDragPointInfo * infoObject = (*iterPoint).second;

    //==========================================================================
    // extract the best element; compute drag given this velocity, property, etc
    // this point drag value will be used by all other elements below
    //==========================================================================
    stk::mesh::Entity bestElem = infoObject->bestElem_;
    int nodesPerElement = bulkData.num_nodes(bestElem);

    // resize some work vectors
    resize_std_vector(nDim, ws_coordinates_, bestElem, bulkData);
    resize_std_vector(nDim, ws_velocity_, bestElem, bulkData);
    resize_std_vector(1, ws_viscosity_, bestElem, bulkData);
    resize_std_vector(1, ws_density_, bestElem, bulkData);

    // gather nodal data to element nodes; both vector and scalar; coords are used in determinant calc
    gather_field(nDim, &ws_coordinates_[0], *coordinates, bulkData.begin_nodes(bestElem),
                 nodesPerElement);
    gather_field_for_interp(nDim, &ws_velocity_[0], *velocity, bulkData.begin_nodes(bestElem),
                            nodesPerElement);
    gather_field_for_interp(1, &ws_viscosity_[0], *viscosity, bulkData.begin_nodes(bestElem),
                            nodesPerElement);
    gather_field_for_interp(1, &ws_density_[0], *density, bulkData.begin_nodes(bestElem),
                            nodesPerElement);

    // compute volume
    double bestElemVolume = compute_volume(nDim, bestElem, bulkData);

    // interpolate velocity
    interpolate_field(nDim, bestElem, bulkData, &(infoObject->isoParCoords_[0]),
                      &ws_velocity_[0], &ws_pointGasVelocity[0]);

    // interpolate viscosity
    interpolate_field(1, bestElem, bulkData, &(infoObject->isoParCoords_[0]),
                      &ws_viscosity_[0], &ws_pointGasViscosity);

    // interpolate density
    interpolate_field(1, bestElem, bulkData, &(infoObject->isoParCoords_[0]),
                      &ws_density_[0], &ws_pointGasDensity);

    // point drag calculation
    compute_point_drag(nDim, infoObject->radius_, &infoObject->velocity_[0], &ws_pointGasVelocity[0], ws_pointGasViscosity,
                       ws_pointGasDensity, &ws_pointForce[0], &ws_pointForceLHS[0]);

    // assemble nodal lhs quantity for best elem
    assemble_lhs_to_best_elem_nodes(nDim, bestElem, bulkData, bestElemVolume, &ws_pointForceLHS[0],
                              *actuator_source_lhs);

    // get the vector of elements
    std::set<stk::mesh::Entity> nodeVec = infoObject->nodeVec_;

    spread_actuator_force_to_node_vec(nDim, nodeVec, ws_pointForce, &(infoObject->centroidCoords_[0]), *coordinates, *actuator_source, infoObject->gaussDecayRadius_);

  }

  // parallel assemble (contributions from ghosted and locally owned)
  std::vector<const stk::mesh::FieldBase*> sumFieldVec;
  sumFieldVec.push_back(actuator_source);
  sumFieldVec.push_back(actuator_source_lhs);
  stk::mesh::parallel_sum_including_ghosts(bulkData, sumFieldVec);

}
//--------------------------------------------------------------------------
//-------- populate_candidate_elements -------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::populate_candidate_elements()
{
  stk::mesh::MetaData & metaData = realm_.meta_data();
  stk::mesh::BulkData & bulkData = realm_.bulk_data();

  const int nDim = metaData.spatial_dimension();

  // fields
  VectorFieldType *coordinates = metaData.get_field<VectorFieldType>(stk::topology::NODE_RANK, realm_.get_coordinates_name());

  // point data structures
  Point minCorner, maxCorner;

  // extract part
  stk::mesh::PartVector searchParts;
  for ( size_t k = 0; k < searchTargetNames_.size(); ++k ) {
    stk::mesh::Part *thePart = metaData.get_part(searchTargetNames_[k]);
    if ( NULL != thePart )
      searchParts.push_back(thePart);
    else
      throw std::runtime_error("ActuatorLinePointDrag: Part is null" + searchTargetNames_[k]);
  }

  // selector and bucket loop
  stk::mesh::Selector s_locally_owned = metaData.locally_owned_part()
    &stk::mesh::selectUnion(searchParts);

  stk::mesh::BucketVector const& elem_buckets =
    realm_.get_buckets( stk::topology::ELEMENT_RANK, s_locally_owned );

  for ( stk::mesh::BucketVector::const_iterator ib = elem_buckets.begin();
        ib != elem_buckets.end() ; ++ib ) {

    stk::mesh::Bucket & b = **ib;

    const stk::mesh::Bucket::size_type length   = b.size();

    for ( stk::mesh::Bucket::size_type k = 0 ; k < length ; ++k ) {

      // get element
      stk::mesh::Entity elem = b[k];

      // initialize max and min
      for (int j = 0; j < nDim; ++j ) {
        minCorner[j] = +1.0e16;
        maxCorner[j] = -1.0e16;
      }

      // extract elem_node_relations
      stk::mesh::Entity const* elem_node_rels = bulkData.begin_nodes(elem);
      const int num_nodes = bulkData.num_nodes(elem);

      for ( int ni = 0; ni < num_nodes; ++ni ) {
        stk::mesh::Entity node = elem_node_rels[ni];

        // pointers to real data
        const double * coords = stk::mesh::field_data(*coordinates, node );

        // check max/min
        for ( int j = 0; j < nDim; ++j ) {
          minCorner[j] = std::min(minCorner[j], coords[j]);
          maxCorner[j] = std::max(maxCorner[j], coords[j]);
        }
      }

      // setup ident
      stk::search::IdentProc<uint64_t,int> theIdent(bulkData.identifier(elem), NaluEnv::self().parallel_rank());

      // create the bounding point box and push back
      boundingElementBox theBox(Box(minCorner,maxCorner), theIdent);
      boundingElementBoxVec_.push_back(theBox);
    }
  }
}

//--------------------------------------------------------------------------
//-------- determine_elems_to_ghost ----------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::determine_elems_to_ghost()
{
  stk::mesh::BulkData & bulkData = realm_.bulk_data();

  stk::search::coarse_search(boundingSphereVec_, boundingElementBoxVec_, searchMethod_,
                             NaluEnv::self().parallel_comm(), searchKeyPair_);

  // sort to avoid possible elemsToGhost_ ordering in change_ghosting() or actuatorLinePointInfo->nodeVec_.insert(node)
  std::sort(searchKeyPair_.begin(), searchKeyPair_.end());
  
  // lowest effort is to ghost elements to the owning rank of the point; can just as easily do the opposite
  std::vector<std::pair<boundingSphere::second_type, boundingElementBox::second_type> >::const_iterator ii;
  for( ii=searchKeyPair_.begin(); ii!=searchKeyPair_.end(); ++ii ) {

    const uint64_t theBox = ii->second.id();
    unsigned theRank = NaluEnv::self().parallel_rank();
    const unsigned pt_proc = ii->first.proc();
    const unsigned box_proc = ii->second.proc();
    
    if ( (box_proc == theRank) && (pt_proc != theRank) ) {

      // Send box to pt proc

      // find the element
      stk::mesh::Entity theElemMeshObj = bulkData.get_entity(stk::topology::ELEMENT_RANK, theBox);
      if ( !(bulkData.is_valid(theElemMeshObj)) )
        throw std::runtime_error("no valid entry for element");

      // new element to ghost counter
      needToGhostCount_++;

      // deal with elements to push back to be ghosted
      stk::mesh::EntityProc theElemPair(theElemMeshObj, pt_proc);
      elemsToGhost_.push_back(theElemPair);
    }
  }
}

//--------------------------------------------------------------------------
//-------- create_actuator_line_point_info_map -----------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::create_actuator_line_point_info_map() {

  const double currentTime = realm_.get_current_time();

  stk::mesh::MetaData & metaData = realm_.meta_data();
  const int nDim = metaData.spatial_dimension();

  for ( size_t k = 0; k < actuatorLineInfo_.size(); ++k ) {

    const ActuatorLinePointDragInfo *actuatorLineInfo = actuatorLineInfo_[k];

    int processorId = actuatorLineInfo->processorId_;
    if ( processorId == NaluEnv::self().parallel_rank() ) {

      // define a point that will hold the centroid
      Point centroidCoords;

     // determine the distance between points and line centroid (for rotation)
      double dx[3] = {};
      double lineCentroid[3] = {};
      double velocity[3] = {};
      double currentCoords[3] = {};

      std::vector<double> tipC(nDim);
      tipC[0] = actuatorLineInfo->tipCoordinates_.x_;
      tipC[1] = actuatorLineInfo->tipCoordinates_.y_;

      std::vector<double> tailC(nDim);
      tailC[0] = actuatorLineInfo->tailCoordinates_.x_;
      tailC[1] = actuatorLineInfo->tailCoordinates_.y_;

      if ( nDim > 2) {
        tipC[2] = actuatorLineInfo->tipCoordinates_.z_;
        tailC[2] = actuatorLineInfo->tailCoordinates_.z_;
      }

      const int numPoints = actuatorLineInfo->numPoints_;
      const int denomPts = std::max(numPoints - 1, 1);
      for ( int j = 0; j < nDim; ++j ) {
        dx[j] = (tipC[j] - tailC[j])/(double)denomPts;
        lineCentroid[j] = (tipC[j] + tailC[j])/2.0;
      }

      // loop over all points
      for ( int np = 0; np < numPoints; ++np ) {
        // extract current localPointId; increment for next one up...
        size_t localPointId = localPointId_++;
        stk::search::IdentProc<uint64_t,int> theIdent(localPointId, NaluEnv::self().parallel_rank());

        // set model coordinates
        for ( int j = 0; j < nDim; ++j )
          currentCoords[j] = tailC[j] + np*dx[j];

        // move the coordinates; set the velocity... may be better on the lineInfo object
        set_current_coordinates(lineCentroid, currentCoords, actuatorLineInfo->omega_, currentTime);
        set_current_velocity(lineCentroid, currentCoords, velocity, actuatorLineInfo->omega_);

        for ( int j = 0; j < nDim; ++j )
          centroidCoords[j] = currentCoords[j];

        // create the bounding point sphere and push back
        boundingSphere theSphere( Sphere(centroidCoords, actuatorLineInfo->radius_), theIdent);
        boundingSphereVec_.push_back(theSphere);

        // create the point info and push back to map
        ActuatorLinePointDragPointInfo *actuatorLinePointInfo
          = new ActuatorLinePointDragPointInfo(localPointId, centroidCoords,
                                      actuatorLineInfo->radius_, actuatorLineInfo->omega_,
                                      actuatorLineInfo->gaussDecayRadius_, velocity);
        actuatorLinePointInfoMap_[localPointId] = actuatorLinePointInfo;
      }
    }
  }
}



//--------------------------------------------------------------------------
//-------- set_current_coordinates -----------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::set_current_coordinates(
  double *lineCentroid, double *centroidCoords, const double &omega, const double &currentTime)
{
  // hack for rotation only in the y-z plane
  const double cY = centroidCoords[1] - lineCentroid[1];
  const double cZ = centroidCoords[2] - lineCentroid[2];
  const double sinOT = sin(omega*currentTime);
  const double cosOT = cos(omega*currentTime);

  // no change to x-coordinates
  centroidCoords[1] = sinOT*cZ + cosOT*cY + lineCentroid[1];
  centroidCoords[2] = cosOT*cZ - sinOT*cY + lineCentroid[2];
}

//--------------------------------------------------------------------------
//-------- set_current_velocity --------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::set_current_velocity(
  double *lineCentroid, const double *centroidCoords, double *velocity, const double &omega)
{
  // hack for rotation only in the y-z plane
  double cY = centroidCoords[1] - lineCentroid[1];
  double cZ = centroidCoords[2] - lineCentroid[2];

  velocity[0] = 0.0;
  velocity[1] = -omega*cZ;
  velocity[2] = +omega*cY;
}

//--------------------------------------------------------------------------
//-------- manage_ghosting -------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::manage_ghosting()
{
  stk::mesh::BulkData & bulkData = realm_.bulk_data();

  // check for ghosting need
  uint64_t g_needToGhostCount = 0;
  stk::all_reduce_sum(NaluEnv::self().parallel_comm(), &needToGhostCount_, &g_needToGhostCount, 1);
  if (g_needToGhostCount > 0) {
    NaluEnv::self().naluOutputP0() << "ActuatorLinePointDrag alg will ghost a number of entities: "
                                   << g_needToGhostCount  << std::endl;    
    bulkData.modification_begin();
    bulkData.change_ghosting( *actuatorLineGhosting_, elemsToGhost_);
    bulkData.modification_end();
  }
  else {
    NaluEnv::self().naluOutputP0() << "ActuatorLinePointDrag alg will NOT ghost entities: " << std::endl;
  }
}

//--------------------------------------------------------------------------
//-------- complete_search -------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::complete_search()
{
  stk::mesh::MetaData & metaData = realm_.meta_data();
  stk::mesh::BulkData & bulkData = realm_.bulk_data();
  const int nDim = metaData.spatial_dimension();

  // extract fields
  VectorFieldType *coordinates
    = metaData.get_field<VectorFieldType>(stk::topology::NODE_RANK, realm_.get_coordinates_name());

  // now proceed with the standard search
  std::vector<std::pair<boundingSphere::second_type, boundingElementBox::second_type> >::const_iterator ii;
  for( ii=searchKeyPair_.begin(); ii!=searchKeyPair_.end(); ++ii ) {

    const uint64_t thePt = ii->first.id();
    const uint64_t theBox = ii->second.id();
    const unsigned theRank = NaluEnv::self().parallel_rank();
    const unsigned pt_proc = ii->first.proc();

    // check if I own the point...
    if ( theRank == pt_proc ) {

      // yes, I own the point...

      // proceed as required; all elements should have already been ghosted via the coarse search
      stk::mesh::Entity elem = bulkData.get_entity(stk::topology::ELEMENT_RANK, theBox);
      if ( !(bulkData.is_valid(elem)) )
        throw std::runtime_error("no valid entry for element");

      // find the point data structure
      std::map<size_t, ActuatorLinePointDragPointInfo *>::iterator iterPoint;
      iterPoint=actuatorLinePointInfoMap_.find(thePt);
      if ( iterPoint == actuatorLinePointInfoMap_.end() )
        throw std::runtime_error("no valid entry for actuatorLinePointInfoMap_");

      // extract the point object and push back the element to either the best
      // candidate or the standard vector of elements
      ActuatorLinePointDragPointInfo *actuatorLinePointInfo = iterPoint->second;

      // extract topo and master element for this topo
      const stk::mesh::Bucket &theBucket = bulkData.bucket(elem);
      const stk::topology &elemTopo = theBucket.topology();
      MasterElement *meSCS = sierra::nalu::MasterElementRepo::get_surface_master_element(elemTopo);
      const int nodesPerElement = meSCS->nodesPerElement_;

      // gather elemental coords
      std::vector<double> elementCoords(nDim*nodesPerElement);
      gather_field_for_interp(nDim, &elementCoords[0], *coordinates, bulkData.begin_nodes(elem),
                   nodesPerElement);


      // find isoparametric points
      std::vector<double> isoParCoords(nDim);
      const double nearestDistance = meSCS->isInElement(&elementCoords[0],
                                                        &(actuatorLinePointInfo->centroidCoords_[0]),
                                                        &(isoParCoords[0]));

      // save off best element and its isoparametric coordinates for this point
      if ( nearestDistance < actuatorLinePointInfo->bestX_ ) {
        actuatorLinePointInfo->bestX_ = nearestDistance;
        actuatorLinePointInfo->isoParCoords_ = isoParCoords;
        actuatorLinePointInfo->bestElem_ = elem;
      }
      // extract elem_node_relations
      stk::mesh::Entity const* elem_node_rels = bulkData.begin_nodes(elem);
      const unsigned num_nodes = bulkData.num_nodes(elem);
      for (unsigned inode = 0; inode < num_nodes; inode++) {
          stk::mesh::Entity node = elem_node_rels[inode];
          actuatorLinePointInfo->nodeVec_.insert(node);
      }
    }
    else {
      // not this proc's issue
    }
  }
}

//--------------------------------------------------------------------------
//-------- resize_std_vector -----------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::resize_std_vector(
  const int &sizeOfField,
  std::vector<double> &theVector,
  stk::mesh::Entity elem,
  const stk::mesh::BulkData & bulkData)
{
  const stk::topology &elemTopo = bulkData.bucket(elem).topology();
  MasterElement *meSCS = sierra::nalu::MasterElementRepo::get_surface_master_element(elemTopo);
  const int nodesPerElement = meSCS->nodesPerElement_;
  theVector.resize(nodesPerElement*sizeOfField);
}

//--------------------------------------------------------------------------
//-------- gather_field ----------------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::gather_field(
  const int &sizeOfField,
  double *fieldToFill,
  const stk::mesh::FieldBase &stkField,
  stk::mesh::Entity const* elem_node_rels,
  const int &nodesPerElement)
{
  for ( int ni = 0; ni < nodesPerElement; ++ni ) {
    stk::mesh::Entity node = elem_node_rels[ni];
    const double * theField = (double*)stk::mesh::field_data(stkField, node );
    for ( int j = 0; j < sizeOfField; ++j ) {
      const int offSet = ni*sizeOfField+j;
      fieldToFill[offSet] = theField[j];
    }
  }
}

//--------------------------------------------------------------------------
//-------- gather_field_for_interp -----------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::gather_field_for_interp(
  const int &sizeOfField,
  double *fieldToFill,
  const stk::mesh::FieldBase &stkField,
  stk::mesh::Entity const* elem_node_rels,
  const int &nodesPerElement)
{
  for ( int ni = 0; ni < nodesPerElement; ++ni ) {
    stk::mesh::Entity node = elem_node_rels[ni];
    const double * theField = (double*)stk::mesh::field_data(stkField, node );
    for ( int j = 0; j < sizeOfField; ++j ) {
      const int offSet = j*nodesPerElement + ni;
      fieldToFill[offSet] = theField[j];
    }
  }
}

//--------------------------------------------------------------------------
//-------- compute_volume --------------------------------------------------
//--------------------------------------------------------------------------
double
ActuatorLinePointDrag::compute_volume(
  const int &nDim,
  stk::mesh::Entity elem,
  const stk::mesh::BulkData & bulkData)
{
  // extract master element from the bucket in which the element resides
  const stk::topology &elemTopo = bulkData.bucket(elem).topology();
  MasterElement *meSCV = sierra::nalu::MasterElementRepo::get_volume_master_element(elemTopo);
  const int numScvIp = meSCV->numIntPoints_;

  // compute scv for this element
  ws_scv_volume_.resize(numScvIp);
  double scv_error = 0.0;
  meSCV->determinant(1, &ws_coordinates_[0], &ws_scv_volume_[0], &scv_error);

  double elemVolume = 0.0;
  for ( int ip = 0; ip < numScvIp; ++ip ) {
    elemVolume += ws_scv_volume_[ip];
  }
  return elemVolume;
}

//--------------------------------------------------------------------------
//-------- interpolate_field -----------------------------------------------
//--------------------------------------------------------------------------
void
ActuatorLinePointDrag::interpolate_field(
  const int &sizeOfField,
  stk::mesh::Entity elem,
  const stk::mesh::BulkData & bulkData,
  double *isoParCoords,
  const double *fieldAtNodes,
  double *pointField)
{
  // extract master element from the bucket in which the element resides
  const stk::topology &elemTopo = bulkData.bucket(elem).topology();
  MasterElement *meSCS = sierra::nalu::MasterElementRepo::get_surface_master_element(elemTopo);

  // interpolate velocity to this best point
  meSCS->interpolatePoint(
    sizeOfField,
    isoParCoords,
    fieldAtNodes,
    pointField);
}

//--------------------------------------------------------------------------
//-------- compute_radius ------------------------------------------------
//--------------------------------------------------------------------------
double
ActuatorLinePointDrag::compute_radius(
  const int &nDim,
  const double *elemCentroid,
  const double *pointCentroid)
{
  double radius = 0.0;
  for ( int j = 0; j < nDim; ++j )
    radius += std::pow(elemCentroid[j] - pointCentroid[j], 2);
  radius = std::sqrt(radius);
  return radius;
}

//--------------------------------------------------------------------------
//-------- assemble_source_to_nodes ----------------------------------------
//-------------------------------------------------------------------------
void
ActuatorLinePointDrag::assemble_lhs_to_best_elem_nodes(
  const int &nDim,
  stk::mesh::Entity elem,
  const stk::mesh::BulkData & bulkData,
  const double &elemVolume,
  const double *dragLHS,
  stk::mesh::FieldBase &actuator_source_lhs)
{
  // extract master element from the bucket in which the element resides
  const stk::topology &elemTopo = bulkData.bucket(elem).topology();
  MasterElement *meSCV = sierra::nalu::MasterElementRepo::get_volume_master_element(elemTopo);
  const int numScvIp = meSCV->numIntPoints_;

  // extract elem_node_relations
  stk::mesh::Entity const* elem_node_rels = bulkData.begin_nodes(elem);

  // assemble to nodes
  const int *ipNodeMap = meSCV->ipNodeMap();
  for ( int ip = 0; ip < numScvIp; ++ip ) {

    // nearest node to ip
    const int nearestNode = ipNodeMap[ip];

    // extract node and pointer to source term
    stk::mesh::Entity node = elem_node_rels[nearestNode];
    double * sourceTermLHS = (double*)stk::mesh::field_data(actuator_source_lhs, node );

    // nodal weight based on volume weight
    const double nodalWeight = ws_scv_volume_[ip]/elemVolume;
    for(int j = 0; j < nDim; j++)
        sourceTermLHS[j] += nodalWeight*dragLHS[j];
  }
}


// Spread actuator force to nodes
void
ActuatorLinePointDrag::spread_actuator_force_to_node_vec(
  const int &nDim,
  std::set<stk::mesh::Entity>& nodeVec,
  const std::vector<double>& actuator_force,
  const double * actuator_node_coordinates,
  const stk::mesh::FieldBase & coordinates,
  stk::mesh::FieldBase & actuator_source,
  const double& epsilon
)
{
    std::vector<double> ws_nodeForce(nDim);
    // iterate over node vector, calculate and apply source term
    std::set<stk::mesh::Entity>::iterator iNode;
    for (iNode = nodeVec.begin(); iNode != nodeVec.end(); ++iNode ) {

      stk::mesh::Entity node = *iNode;
      const double * node_coords = (double*)stk::mesh::field_data(coordinates, node );
      const double radius = compute_radius(nDim, node_coords, actuator_node_coordinates);
      // project the force to this node with projection function
      compute_node_drag_given_radius(nDim, radius, epsilon, &actuator_force[0], &ws_nodeForce[0]);
      double * sourceTerm = (double*)stk::mesh::field_data(actuator_source, node );
      for ( int j=0; j < nDim; ++j ) sourceTerm[j] = ws_nodeForce[j];

    }
}

} // namespace nalu
} // namespace Sierra
